Mapper(key k, tuple t)
tuple g = project(t)
emit(g, null)
# Only for duplicate elimination
Reducer(tuple t, array n)
emit(t, null)
url1 {jaguar, fast, v8}
url2 {jaguar, fast, teeth, tail}
jaguar -> {url1, url2}
fast -> {url1, url2}
v8 -> {url1}
tooth -> {url2}
tail -> {url1}
Mapper(docID id, doc d)
for each term t in d:
emit(t, id)
Reducer(term t, docIDs [id1, id2, ..., idn])
emit(t, flattened list of docIDs)
[ketchup, mustard, relish]
[ketchup, buns]
N = 4 (number of distinct items)
k | m | r | b | |
---|---|---|---|---|
k | X | 1 | 1 | 1 |
m | X | X | 1 | 0 |
r | X | X | X | 0 |
b | X | X | X | X |
([k, m], 1)
([k, r], 1)
([k, b], 1)
([m, r], 1)
([m, b], 0)
([r, b], 0)
Number of elements in output:
\[\begin{align}
|output| &= \binom{n}{2}\\
&= \frac{4!}{2!2!}\\
&= 6\\
\end{align}\]
# Each item is one thing in a receipt
Mapper(null, items [i1, i2, ..., ik])
for each item i in [i1, ..., ik]:
for each item j in [i1, ..., ik] where j > i:
emit([i, j], 1)
Reducer(pair [i, j], counts [c1, ..., cn])
s = sum of [c1, ..., cn]
emit([i, j], s)
Mapper(null, items [i1, i2, ..., ik])
for each item i in [i1, ..., ik]:
H = new Map from item -> counter, initially zero
for each item j in [i1, ..., ik] where j > i:
H{j} = H{j} + 1
emit(i, H)
# H is a stripe, e.g.
# (ketchup, {mustard: 1, relish: 1})
Reducer(item i, stripes [H1, H2, ..., Hm])
H = new Map from item -> counter, initially zero
H = merge_sum([H1, H2, ..., Hm])
for each item j in keys(H):
emit([i, j], H{j})
RDD 1 | RDD 2 |
---|---|
url1 val1 -> executor 1 |
url1 val1' -> executor 1 |
url2 val2 -> executor 1 |
url2 val2' -> executor 1 |
url3 val3 -> executor 2 |
url3 val3' -> executor 2 |
Assuming both are partitioned by key.
rdd1.join(rdd2)
This is a narrow dependency:
RDD 1 | RDD 2 |
---|---|
url1 val1 -> executor 3 |
url1 val1' -> executor 1 |
url2 val2 -> executor 1 |
url2 val2' -> executor 1 |
url3 val3 -> executor 2 |
url3 val3' -> executor 2 |
In this case, the same key is not on the same executor always, assuming arbitrary partitioning for e.g. table 1, but by key for table 2.
rdd1.join(rdd2)
: code looks the same as before, it all depends on the state of the data
This is a wide dependency because a shuffle is required before joining.
rdd1.map((x, y) => (y, x)).join(rdd2)
will almost certainly by a wide dependency since it will now be indexed by value.
val dat = sc.textFile("sample_input")
dat
.flatMap(_.split("\\s+")) // Tokenize
.filter(_.length() > 4) // Get rid of small words like "a", "an", etc
.map(x => (x, 1)) // Initial count per word is 1
.reduceByKey(_+_) // At this point we have word counts by word
.map(x => (x._2, x._1)) // Swap key and value
.sortByKey(false) // false means it will be descending
.take(15) // Get the first 15 results